import os
from unittest import TestCase, mock

import redis
from scrapy import Request, Spider
from scrapy.settings import Settings
from scrapy.utils.test import get_crawler

from scrapy_redis import connection
from scrapy_redis.dupefilter import RFPDupeFilter
from scrapy_redis.queue import FifoQueue, LifoQueue, PriorityQueue
from scrapy_redis.scheduler import Scheduler

# allow test settings from environment
REDIS_HOST = os.environ.get("REDIS_HOST", "localhost")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))


def get_spider(*args, **kwargs):
    crawler = get_crawler(
        spidercls=kwargs.pop("spidercls", None),
        settings_dict=kwargs.pop("settings_dict", None),
    )
    return crawler._create_spider(*args, **kwargs)


class RedisTestMixin:

    @property
    def server(self):
        if not hasattr(self, "_redis"):
            self._redis = redis.Redis(REDIS_HOST, REDIS_PORT)
        return self._redis

    def clear_keys(self, prefix):
        keys = self.server.keys(prefix + "*")
        if keys:
            self.server.delete(*keys)


class DupeFilterTest(RedisTestMixin, TestCase):

    def setUp(self):
        self.key = "scrapy_redis:tests:dupefilter:"
        self.df = RFPDupeFilter(self.server, self.key)

    def tearDown(self):
        self.clear_keys(self.key)

    def test_dupe_filter(self):
        req = Request("http://example.com")

        self.assertFalse(self.df.request_seen(req))
        self.assertTrue(self.df.request_seen(req))

        self.df.close("nothing")


class QueueTestMixin(RedisTestMixin):

    queue_cls = None

    def setUp(self):
        self.spider = get_spider(name="myspider")
        self.key = f"scrapy_redis:tests:{self.spider.name}:queue"
        self.q = self.queue_cls(self.server, Spider("myspider"), self.key)

    def tearDown(self):
        self.clear_keys(self.key)

    def test_clear(self):
        self.assertEqual(len(self.q), 0)

        for i in range(10):
            # XXX: can't use same url for all requests as SpiderPriorityQueue
            # uses redis' set implemention and we will end with only one
            # request in the set and thus failing the test. It should be noted
            # that when using SpiderPriorityQueue it acts as a request
            # duplication filter whenever the serielized requests are the same.
            # This might be unwanted on repetitive requests to the same page
            # even with dont_filter=True flag.
            req = Request(f"http://example.com/?page={i}")
            self.q.push(req)
        self.assertEqual(len(self.q), 10)

        self.q.clear()
        self.assertEqual(len(self.q), 0)


class FifoQueueTest(QueueTestMixin, TestCase):

    queue_cls = FifoQueue

    def test_queue(self):
        req1 = Request("http://example.com/page1")
        req2 = Request("http://example.com/page2")

        self.q.push(req1)
        self.q.push(req2)

        out1 = self.q.pop()
        out2 = self.q.pop(timeout=1)

        self.assertEqual(out1.url, req1.url)
        self.assertEqual(out2.url, req2.url)


class PriorityQueueTest(QueueTestMixin, TestCase):

    queue_cls = PriorityQueue

    def test_queue(self):
        req1 = Request("http://example.com/page1", priority=100)
        req2 = Request("http://example.com/page2", priority=50)
        req3 = Request("http://example.com/page2", priority=200)

        self.q.push(req1)
        self.q.push(req2)
        self.q.push(req3)

        out1 = self.q.pop()
        out2 = self.q.pop(timeout=0)
        out3 = self.q.pop(timeout=1)

        self.assertEqual(out1.url, req3.url)
        self.assertEqual(out2.url, req1.url)
        self.assertEqual(out3.url, req2.url)


class LifoQueueTest(QueueTestMixin, TestCase):

    queue_cls = LifoQueue

    def test_queue(self):
        req1 = Request("http://example.com/page1")
        req2 = Request("http://example.com/page2")

        self.q.push(req1)
        self.q.push(req2)

        out1 = self.q.pop()
        out2 = self.q.pop(timeout=1)

        self.assertEqual(out1.url, req2.url)
        self.assertEqual(out2.url, req1.url)


class SchedulerTest(RedisTestMixin, TestCase):

    def setUp(self):
        self.key_prefix = "scrapy_redis:tests:"
        self.queue_key = self.key_prefix + "%(spider)s:requests"
        self.dupefilter_key = self.key_prefix + "%(spider)s:dupefilter"
        self.spider = get_spider(
            name="myspider",
            settings_dict={
                "REDIS_HOST": REDIS_HOST,
                "REDIS_PORT": REDIS_PORT,
                "SCHEDULER_QUEUE_KEY": self.queue_key,
                "SCHEDULER_DUPEFILTER_KEY": self.dupefilter_key,
                "SCHEDULER_FLUSH_ON_START": False,
                "SCHEDULER_PERSIST": False,
                "SCHEDULER_SERIALIZER": "pickle",
                "DUPEFILTER_CLASS": "scrapy_redis.dupefilter.RFPDupeFilter",
            },
        )
        self.scheduler = Scheduler.from_crawler(self.spider.crawler)

    def tearDown(self):
        self.clear_keys(self.key_prefix)

    def test_scheduler(self):
        # default no persist
        self.assertFalse(self.scheduler.persist)

        self.scheduler.open(self.spider)
        self.assertEqual(len(self.scheduler), 0)

        req = Request("http://example.com")
        self.scheduler.enqueue_request(req)
        self.assertTrue(self.scheduler.has_pending_requests())
        self.assertEqual(len(self.scheduler), 1)

        # dupefilter in action
        self.scheduler.enqueue_request(req)
        self.assertEqual(len(self.scheduler), 1)

        out = self.scheduler.next_request()
        self.assertEqual(out.url, req.url)

        self.assertFalse(self.scheduler.has_pending_requests())
        self.assertEqual(len(self.scheduler), 0)

        self.scheduler.close("finish")

    def test_scheduler_persistent(self):
        # TODO: Improve this test to avoid the need to check for log messages.
        self.spider.log = mock.Mock(spec=self.spider.log)

        self.scheduler.persist = True
        self.scheduler.open(self.spider)

        self.assertEqual(self.spider.log.call_count, 0)

        self.scheduler.enqueue_request(Request("http://example.com/page1"))
        self.scheduler.enqueue_request(Request("http://example.com/page2"))

        self.assertTrue(self.scheduler.has_pending_requests())
        self.scheduler.close("finish")

        self.scheduler.open(self.spider)
        self.spider.log.assert_has_calls(
            [
                mock.call("Resuming crawl (2 requests scheduled)"),
            ]
        )
        self.assertEqual(len(self.scheduler), 2)

        self.scheduler.persist = False
        self.scheduler.close("finish")

        self.assertEqual(len(self.scheduler), 0)


class ConnectionTest(TestCase):

    # We can get a connection from just REDIS_URL.
    def test_redis_url(self):
        settings = Settings(
            {
                "REDIS_URL": "redis://foo:bar@localhost:9001/42",
            }
        )

        server = connection.from_settings(settings)
        connect_args = server.connection_pool.connection_kwargs

        self.assertEqual(connect_args["host"], "localhost")
        self.assertEqual(connect_args["port"], 9001)
        self.assertEqual(connect_args["password"], "bar")
        self.assertEqual(connect_args["db"], 42)

    # We can get a connection from REDIS_HOST/REDIS_PORT.
    def test_redis_host_port(self):
        settings = Settings(
            {
                "REDIS_HOST": "localhost",
                "REDIS_PORT": 9001,
            }
        )

        server = connection.from_settings(settings)
        connect_args = server.connection_pool.connection_kwargs

        self.assertEqual(connect_args["host"], "localhost")
        self.assertEqual(connect_args["port"], 9001)

    # REDIS_URL takes precedence over REDIS_HOST/REDIS_PORT.
    def test_redis_url_precedence(self):
        settings = Settings(
            {
                "REDIS_HOST": "baz",
                "REDIS_PORT": 1337,
                "REDIS_URL": "redis://foo:bar@localhost:9001/42",
            }
        )

        server = connection.from_settings(settings)
        connect_args = server.connection_pool.connection_kwargs

        self.assertEqual(connect_args["host"], "localhost")
        self.assertEqual(connect_args["port"], 9001)
        self.assertEqual(connect_args["password"], "bar")
        self.assertEqual(connect_args["db"], 42)

    # We fallback to REDIS_HOST/REDIS_PORT if REDIS_URL is None.
    def test_redis_host_port_fallback(self):
        settings = Settings(
            {"REDIS_HOST": "baz", "REDIS_PORT": 1337, "REDIS_URL": None}
        )

        server = connection.from_settings(settings)
        connect_args = server.connection_pool.connection_kwargs

        self.assertEqual(connect_args["host"], "baz")
        self.assertEqual(connect_args["port"], 1337)

    # We use default values for REDIS_HOST/REDIS_PORT.
    def test_redis_default(self):
        settings = Settings()

        server = connection.from_settings(settings)
        connect_args = server.connection_pool.connection_kwargs

        self.assertEqual(connect_args["host"], "localhost")
        self.assertEqual(connect_args["port"], 6379)
